<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
        "http://www.w3.org/TR/html4/loose.dtd">

<html lang="en">

<head>
<meta name=Title content="HCatalog Streaming Mutation API">
<meta name=Keywords content="HCatalog Streaming Mutation ACID">
<meta http-equiv=Content-Type content="text/html; charset=utf-8">
<title>HCatalog Streaming Mutation API</title>
</head>

<body>

<h1>HCatalog Streaming Mutation API -- high level description</h1>
<h2>@deprecated as of Hive 3.0.0</h2>
<h2>Background</h2>
<p>
In certain data processing use cases it is necessary to modify existing
data when new facts arrive. An example of this is the classic ETL merge
where a copy of a data set is kept in sync with a master by the frequent
application of deltas. The deltas describe the mutations (inserts,
updates, deletes) that have occurred to the master since the previous
sync. To implement such a case using Hadoop traditionally demands that
the partitions containing records targeted by the mutations be
rewritten. This is a coarse approach; a partition containing millions of
records might be rebuilt because of a single record change. Additionally
these partitions cannot be restated atomically; at some point the old
partition data must be swapped with the new partition data. When this
swap occurs, usually by issuing an HDFS
<code>rm</code>
followed by a
<code>mv</code>
, the possibility exists where the data appears to be unavailable and
hence any downstream jobs consuming the data might unexpectedly fail.
Therefore data processing patterns that restate raw data on HDFS cannot
operate robustly without some external mechanism to orchestrate
concurrent access to changing data.
</p>

<p>
The availability of ACID tables in Hive provides a mechanism that both
enables concurrent access to data stored in HDFS (so long as it's in the
ORC+ACID format), and also permits row level mutations or records within
a table, without the need to rewrite the existing data. But while Hive
itself supports
<code>INSERT</code>
,
<code>UPDATE</code>
and
<code>DELETE</code>
commands, and the ORC format can support large batches of mutations in a
transaction, Hive's execution engine currently submits each individual
mutation operation in a separate transaction and issues table scans (M/R
jobs) to execute them. It does not currently scale to the demands of
processing large deltas in an atomic manner. Furthermore it would be
advantageous to extend atomic batch mutation capabilities beyond Hive by
making them available to other data processing frameworks. The Streaming
Mutation API does just this.
</p>

<p>The Streaming Mutation API, although similar to the Streaming
API, has a number of differences and are built to enable very different
use cases. Superficially, the Streaming API can only write new data
whereas the mutation API can also modify existing data. However the two
APIs also based on very different transaction models. The Streaming API
focuses on surfacing a continuous stream of new data into a Hive table
and does so by batching small sets of writes into multiple short-lived
transactions. Conversely the mutation API is designed to infrequently
apply large sets of mutations to a data set in an atomic fashion; all
mutations will either be applied or they will not. This instead mandates
the use of a single long-lived transaction. This table summarises the
attributes of each API:</p>

<table border="1">
<thead>
<tr>
<th>Attribute</th>
<th>Streaming API</th>
<th>Mutation API</th>
</tr>
<tr>
<td>Ingest type</td>
<td>Data arrives continuously</td>
<td>Ingests are performed periodically and the mutations are
applied in a single batch</td>
</tr>
<tr>
<td>Transaction scope</td>
<td>Transactions are created for small batches of writes</td>
<td>The entire set of mutations should be applied within a single
transaction</td>
</tr>
<tr>
<td>Data availability</td>
<td>Surfaces new data to users frequently and quickly</td>
<td>Change sets should be applied atomically, either the effect of
the delta is visible or it is not</td>
</tr>
<tr>
<td>Sensitive to record order</td>
<td>No, records do not have pre-existing lastTxnIds or bucketIds.
Records are likely being written into a single partition (today's date
for example)</td>
<td>Yes, all mutated records have existing <code>RecordIdentifiers</code>
and must be grouped by (partitionValues, bucketId) and sorted by
lastTxnId. These record coordinates initially arrive in an order that is
effectively random.
</td>
</tr>
<tr>
<td>Impact of a write failure</td>
<td>Transaction can be aborted and producer can choose to resubmit
failed records as ordering is not important.</td>
<td>Ingest for the respective must be halted and failed records
resubmitted to preserve sequence.</td>
</tr>
<tr>
<td>User perception of missing data</td>
<td>Data has not arrived yet → "latency?"</td>
<td>"This data is inconsistent, some records have been updated, but
other related records have not" - consider here the classic transfer
between bank accounts scenario</td>
</tr>
<tr>
<td>API end point scope</td>
<td>A given <code>HiveEndPoint</code> instance submits many
transactions to a specific bucket, in a specific partition, of a
specific table
</td>
<td>A set of<code>MutationCoordinators</code> write changes to
unknown set of buckets, of an unknown set of partitions, of specific
tables (can be more than one), within a single transaction.
</td>
</tr>
</thead>
</table>

<h2>Structure</h2>
<p>The API comprises two main concerns: transaction management, and
the writing of mutation operations to the data set. The two concerns
have a minimal coupling as it is expected that transactions will be
initiated from a single job launcher type processes while the writing of
mutations will be scaled out across any number of worker nodes. In the
context of Hadoop M/R these can be more concretely defined as the Tool
and Map/Reduce task components. However, use of this architecture is not
mandated and in fact both concerns could be handled within a single
simple process depending on the requirements.</p>

<p>Note that a suitably configured Hive instance is required to
operate this system even if you do not intend to access the data from
within Hive. Internally, transactions are managed by the Hive MetaStore.
Mutations are performed to HDFS via ORC APIs that bypass the MetaStore.
Additionally you may wish to configure your MetaStore instance to
perform periodic data compactions.</p>

<p>
<b>Note on packaging</b>: The APIs are defined in the <b>org.apache.hive.hcatalog.streaming.mutate</b>
Java package and included as the hive-hcatalog-streaming jar.
</p>

<h2>Data requirements</h2>
<p>
Generally speaking, to apply a mutation to a record one must have some
unique key that identifies the record. However, primary keys are not a
construct provided by Hive. Internally Hive uses
<code>RecordIdentifiers</code>
stored in a virtual
<code>ROW__ID</code>
column to uniquely identified records within an ACID table. Therefore,
any process that wishes to issue mutations to a table via this API must
have available the corresponding row ids for the target records. What
this means in practice is that the process issuing mutations must first
read in a current snapshot the data and then join the mutations on some
domain specific primary key to obtain the corresponding Hive
<code>ROW__ID</code>
. This is effectively what occurs within Hive's table scan process when
an
<code>UPDATE</code>
or
<code>DELETE</code>
statement is executed. The
<code>AcidInputFormat</code>
provides access to this data via
<code>AcidRecordReader.getRecordIdentifier()</code>
.
</p>

<p>
The implementation of the ACID format places some constraints on the
order in which records are written and it is important that this
ordering is enforced. Additionally, data must be grouped appropriately
to adhere to the constraints imposed be the
<code>OrcRecordUpdater</code>
. Grouping also makes it possible parallelise the writing of mutations
for the purposes of scaling. Finally, to correctly bucket new records
(inserts) there is a slightly unintuitive trick that must be applied.
</p>

<p>All of these data sequencing concerns are the responsibility of
the client process calling the API which is assumed to have first class
grouping and sorting capabilities (Hadoop Map/Reduce etc.) The streaming
API provides nothing more than validators that fail fast when they
encounter groups and records that are out of sequence.</p>

<p>In short, API client processes should prepare data for the mutate
API like so:</p>
<ul>
<li><b>MUST:</b> Order records by <code>ROW__ID.originalTxn</code>,
then <code>ROW__ID.rowId</code>.</li>
<li><b>MUST:</b> Assign a <code>ROW__ID</code> containing a
computed <code>bucketId</code> to records to be inserted.</li>
<li><b>SHOULD:</b> Group/partition by table partition value, then <code>ROW__ID.bucketId</code>.</li>
</ul>

<p>
The addition of a bucket ids to insert records prior to grouping and
sorting seems unintuitive. However, it is required both to ensure
adequate partitioning of new data and bucket allocation consistent with
that provided by Hive. In a typical ETL the majority of mutation events
are inserts, often targeting a single partition (new data for the
previous day, hour, etc.) If more that one worker is writing said
events, were we to leave the bucket id empty then all inserts would go
to a single worker (e.g: reducer) and the workload could be heavily
skewed. The assignment of a computed bucket allows inserts to be more
usefully distributed across workers. Additionally, when Hive is working
with the data it may expect records to have been bucketed in a way that
is consistent with it's own internal scheme. A convenience type and
method is provided to more easily compute and append bucket ids:
<code>BucketIdResolver</code>
and
<code>BucketIdResolverImpl</code>
.
</p>

<p>Update operations should not attempt to modify values of
partition or bucketing columns. The API does not prevent this and such
attempts could lead to data corruption.</p>

<h2>Streaming requirements</h2>
<p>A few things are currently required to use streaming.</p>

<p>
<ol>
<li>Currently, only ORC storage format is supported. So '<b>stored
as orc</b>' must be specified during table creation.
</li>
<li>The hive table must be bucketed, but not sorted. So something
like '<b>clustered by (<i>colName</i>) into <i>10</i> buckets
</b>' must be specified during table creation.
</li>
<li>User of the client streaming process must have the necessary
permissions to write to the table or partition and create partitions in
the table.</li>
<li>Settings required in hive-site.xml for Metastore:
<ol>
<li><b>hive.txn.manager =
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
<li><b>hive.support.concurrency = true </b></li>
<li><b>hive.compactor.initiator.on = true</b></li>
<li><b>hive.compactor.worker.threads > 0 </b></li>
</ol>
</li>
</ol>
</p>

<p>
<b>Note:</b> Streaming mutations to <b>unpartitioned</b> tables is also
supported.
</p>

<h2>Record layout</h2>
<p>
The structure, layout, and encoding of records is the exclusive concern
of the client ETL mutation process and may be quite different from the
target Hive ACID table. The mutation API requires concrete
implementations of the
<code>MutatorFactory</code>
and
<code>Mutator</code>
classes to extract pertinent data from records and serialize data into
the ACID files. Fortunately base classes are provided (
<code>AbstractMutator</code>
,
<code>RecordInspectorImpl</code>
) to simplify this effort and usually all that is required is the
specification of a suitable
<code>ObjectInspector</code>
and the provision of the indexes of the
<code>ROW__ID</code>
and bucketed columns within the record structure. Note that all column
indexes in these classes are with respect to your record structure, not
the Hive table structure.
</p>
<p>
You will likely also want to use a
<code>BucketIdResolver</code>
to append bucket ids to new records for insertion. Fortunately the core
implementation is provided in
<code>BucketIdResolverImpl</code>
but note that bucket column indexes must be presented in the same order
as they are in the Hive table definition to ensure consistent bucketing.
Note that you cannot move records between buckets and an exception will
be thrown if you attempt to do so. In real terms this mean that you
should not attempt to modify the values in bucket columns with an
<code>UPDATE</code>
.
</p>

<h2>Connection and Transaction management</h2>
<p>
The
<code>MutatorClient</code>
class is used to create and manage transactions in which mutations can
be performed. The scope of a transaction can extend across multiple ACID
tables. When a client connects it communicates with the meta store to
verify and acquire meta data for the target tables. An invocation of
<code>newTransaction</code>
then opens a transaction with the meta store, finalizes a collection of
<code>AcidTables</code>
and returns a new
<code>Transaction</code>
instance. The acid tables are light-weight, serializable objects that
are used by the mutation writing components of the API to target
specific ACID file locations. Usually your
<code>MutatorClient</code>
will be running on some master node and your coordinators on worker
nodes. In this event the
<code>AcidTableSerializer</code>
can be used to encode the tables in a more transportable form, for use
as a
<code>Configuration</code>
property for example.
</p>
<p>
As you would expect, a
<code>Transaction</code>
must be initiated with a call to
<code>begin</code>
before any mutations can be applied. This invocation acquires a lock on
the targeted tables using the meta store, and initiates a heartbeat to
prevent transaction timeouts. It is highly recommended that you register
a
<code>LockFailureListener</code>
with the client so that your process can handle any lock or transaction
failures. Typically you may wish to abort the job in the event of such
an error. With the transaction in place you can now start streaming
mutations with one or more
<code>MutatorCoordinator</code>
instances (more on this later), can can finally
<code>commit</code>
or
<code>abort</code>
the transaction when the change set has been applied, which will release
the lock with the meta store client. Finally you should
<code>close</code>
the mutation client to release any held resources.
</p>
<p>
The
<code>MutatorClientBuilder</code>
is provided to simplify the construction of clients.
</p>

<p>
<b>WARNING:</b> Hive doesn't currently have a deadlock detector (it is
being worked on as part of <a
href="https://issues.apache.org/jira/browse/HIVE-9675">HIVE-9675</a>).
This API could potentially deadlock with other stream writers or with
SQL users.
</p>
<h2>Writing data</h2>

<p>
The
<code>MutatorCoordinator</code>
class is used to issue mutations to an ACID table. You will require at
least one instance per table participating in the transaction. The
target of a given instance is defined by the respective
<code>AcidTable</code>
used to construct the coordinator. It is recommended that a
<code>MutatorClientBuilder</code>
is used to simplify the construction process.
</p>

<p>
Mutations can be applied by invoking the respective
<code>insert</code>
,
<code>update</code>
, and
<code>delete</code>
methods on the coordinator. These methods each take as parameters the
target partition of the record and the mutated record. In the case of an
unpartitioned table you should simply pass an empty list as the
partition value. For inserts specifically, only the bucket id will be
extracted from the
<code>RecordIdentifier</code>
, the writeId and rowId will be ignored and replaced by
appropriate values in the
<code>RecordUpdater</code>
. Additionally, in the case of deletes, everything but the
<code>RecordIdentifier</code>
in the record will be ignored and therefore it is often easier to simply
submit the original record.
</p>

<p>
<b>Caution:</b> As mentioned previously, mutations must arrive in
specific order for the resultant table data to be consistent.
Coordinators will verify a naturally ordered sequence of
(writeId, rowId) and will throw an exception if this sequence
is broken. This exception should almost certainly be escalated so that
the transaction is aborted. This, along with the correct ordering of the
data, is the responsibility of the client using the API.
</p>

<h3>Dynamic Partition Creation:</h3>
<p>
It is very likely to be desirable to have new partitions created
automatically (say on a hourly basis). In such cases requiring the Hive
admin to pre-create the necessary partitions may not be reasonable. The
API allows coordinators to create partitions as needed (see:
<code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code>
). Partition creation being an atomic action, multiple coordinators can
race to create the partition, but only one would succeed, so
coordinators clients need not synchronize when creating a partition. The
user of the coordinator process needs to be given write permissions on
the Hive table in order to create partitions.
</p>

<p>Care must be taken when using this option as it requires that the
coordinators maintain a connection with the meta store database. When
coordinator are running in a distributed environment (as is likely the
case) it possible for them to overwhelm the meta store. In such cases it
may be better to disable partition creation and collect a set of
affected partitions as part of your ETL merge process. These can then be
created with a single meta store connection in your client code, once
the cluster side merge process is complete.</p>
<p>
Finally, note that when partition creation is disabled the coordinators
must synthesize the partition URI as they cannot retrieve it from the
meta store. This may cause problems if the layout of your partitions in
HDFS does not follow the Hive standard (as implemented in
<code>
org.apache.hadoop.hive.metastore.Warehouse.getPartitionPath(Path,
LinkedHashMap
&lt;String , String&gt;).
</code>
)
</p>

<h2>Reading data</h2>

<p>
Although this API is concerned with writing changes to data, as
previously stated we'll almost certainly have to read the existing data
first to obtain the relevant
<code>ROW_IDs</code>
. Therefore it is worth noting that reading ACID data in a robust and
consistent manner requires the following:
<ol>
<li>Obtaining a valid transaction list from the meta store (<code>ValidTxnList</code>).
</li>
<li>Acquiring a lock with the meta store and issuing heartbeats (<code>LockImpl</code>
can help with this).
</li>
<li>Configuring the <code>OrcInputFormat</code> and then reading
the data. Make sure that you also pull in the <code>ROW__ID</code>
values. See: <code>AcidRecordReader.getRecordIdentifier</code>.
</li>
<li>Releasing the lock.</li>
</ol>
</p>

<h2>Example</h2>
<p>
<img src="doc-files/system-overview.png" />
</p>
<p>So to recap, the sequence of events required to apply mutations
to a dataset using the API is:</p>
<ol>
<li>Create a <code>MutatorClient</code> to manage a transaction for
the targeted ACID tables. This set of tables should include any
transactional destinations or sources. Don't forget to register a <code>LockFailureListener</code>
so that you can handle transaction failures.
</li>
<li>Open a new <code>Transaction</code> with the client.
</li>
<li>Get the <code>AcidTables</code> from the client.
</li>
<li>Begin the transaction.</li>
<li>Create at least one <code>MutatorCoordinator</code> for each
table. The <code>AcidTableSerializer</code> can help you transport the <code>AcidTables</code>
when your workers are in a distributed environment.
</li>
<li>Compute your mutation set (this is your ETL merge process).</li>
<li>Optionally: collect the set of affected partitions.</li>
<li>Append bucket ids to insertion records. A <code>BucketIdResolver</code>
can help here.
</li>
<li>Group and sort your data appropriately.</li>
<li>Issue mutation events to your coordinators.</li>
<li>Close your coordinators.</li>
<li>Abort or commit the transaction.</li>
<li>Close your mutation client.</li>
<li>Optionally: create any affected partitions that do not exist in
the meta store.</li>
</ol>
<p>
See
<code>ExampleUseCase</code>
and
<code>TestMutations.testUpdatesAndDeletes()</code>
for some very simple usages.
</p>

</body>

</html>
